"""
Case Type   : 发布订阅冲突解决
Case Name   : 禁用/激活创建时未激活的订阅
Create At   : 2024/04/11
Owner       : @songjing20
Description :
    1.在两个集群创建表
    2.创建发布订阅 enabled=off
    3.查询系统表，禁用订阅
    4.发布端插入数据，订阅端查询
    5.激活订阅，查询系统表
    6.发布端更新数据，订阅端查询数据同步
    7.清理环境
Expect      :
    1.成功
    2.成功
    3.系统表subenabled字段值为f，禁用成功，无WARNING
    4.无数据
    5.成功，subenabled字段值为t
    6.成功，3条数据
    7.成功
History     :
"""

import os
import time
import unittest

from yat.test import macro
from yat.test import Node
from testcase.utils.Logger import Logger
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Common import Common
from testcase.utils.Constant import Constant

Primary_SH = CommonSH('PrimaryDbUser')


@unittest.skipIf(3 > Primary_SH.get_node_num(), '非1+2以上环境不执行')
class Pubsubclass(unittest.TestCase):
    def setUp(self):
        self.log = Logger()
        self.log.info(f"-----{os.path.basename(__file__)} start-----")
        self.pri_userdb_pub = Node(node='PrimaryDbUser')
        self.pri_userdb_sub = Node(node='remote1_PrimaryDbUser')
        self.constant = Constant()
        self.commsh_pub = CommonSH('PrimaryDbUser')
        self.commsh_sub = CommonSH('remote1_PrimaryDbUser')
        self.com_pub = Common()
        self.com_sub = Common('remote1_PrimaryDbUser')
        self.tb_name = 't_pubsub_0343'
        self.pubname = 'pub_pubsub_0343'
        self.subname = 'sub_pubsub_0343'
        self.port = str(int(self.pri_userdb_pub.db_port) + 1)
        self.wal_level = self.com_pub.show_param("wal_level")
        self.user_param_pub = f'-U {self.pri_userdb_pub.db_user} ' \
                              f'-W {self.pri_userdb_pub.db_password}'
        self.user_param_sub = f'-U {self.pri_userdb_sub.db_user} ' \
                              f'-W {self.pri_userdb_sub.db_password}'
        text = '-----查询集群状态，修改pg_hba，发布端设置wal_level=logical-----'
        self.log.info(text)
        self.log.info('----- 发布端状态 -----')
        status = self.commsh_pub.get_db_cluster_status('detail')
        self.log.info(status)
        self.assertEqual(status.count('Normal'), 4, f'执行失败:{text}')
        self.log.info('----- 订阅端状态 -----')
        status = self.commsh_sub.get_db_cluster_status(
            'detail', env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(status)
        self.assertEqual(status.count('Normal'), 4, f'执行失败:{text}')
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host  replication  {self.pri_userdb_sub.db_user}  '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, f'发布端配置流复制权限失败:{text}')
        if 'logical' != self.wal_level:
            res = self.commsh_pub.execute_gsguc(
                'set', self.constant.GSGUC_SUCCESS_MSG, 'wal_level=logical')
            self.assertTrue(res, f'发布端设置wal_level失败:{text}')
            restart_res = self.commsh_pub.restart_db_cluster()
            self.assertTrue(restart_res, f'重启集群失败:{text}')
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host  replication  {self.pri_userdb_pub.db_user}  '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, f'订阅端配置流复制权限失败:{text}')

    def test_pubsub(self):
        text = '-----step1:在两个集群创建表;expect:成功-----'
        self.log.info(text)
        sql = f"drop table if exists {self.tb_name} cascade;" \
              f"create table {self.tb_name}(" \
              f"c_int int primary key, c_varchar varchar(13));"
        pub_res = self.commsh_pub.execut_db_sql(sql, self.user_param_pub)
        self.log.info(pub_res)
        self.assertEqual(pub_res.count(self.constant.CREATE_TABLE_SUCCESS), 2,
                         f'发布端建表失败:{text}')
        sub_res = self.commsh_sub.execut_db_sql(
            sql, self.user_param_sub, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_res)
        self.assertEqual(pub_res.count(self.constant.CREATE_TABLE_SUCCESS), 2,
                         f'订阅端建表失败:{text}')

        text = '-----step2:创建发布订阅;expect:成功-----'
        self.log.info(text)
        create_pub = f"drop publication if exists {self.pubname};" \
                     f"create publication {self.pubname} for all tables;"
        pub_res = self.commsh_pub.execut_db_sql(create_pub, self.user_param_pub)
        self.log.info(pub_res)
        self.assertIn(self.constant.create_pub_succ_msg, pub_res,
                      f'执行失败:{text}')
        guc_res = self.commsh_sub.execute_generate(
            macro.COMMON_PASSWD, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn(self.constant.create_keycipher_success, guc_res,
                      f'执行失败:{text}')
        create_sub = f"drop subscription if exists {self.subname} cascade;" \
                     f"create subscription {self.subname} connection " \
                     f"'host={self.pri_userdb_pub.db_host} " \
                     f"port={self.port} " \
                     f"user={self.pri_userdb_pub.db_user} " \
                     f"dbname={self.pri_userdb_pub.db_name} " \
                     f"password={self.pri_userdb_pub.ssh_password}' " \
                     f"publication {self.pubname} with(enabled = off);"
        sub_res = self.commsh_sub.execut_db_sql(
            create_sub, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_res)
        self.assertIn(self.constant.create_sub_succ_msg, sub_res,
                      f'创建订阅失败:{text}')

        text = '-----step3:查询系统表，禁用订阅;' \
               'expect:系统表subenabled字段值为f，禁用成功，无WARNING-----'
        self.log.info(text)
        sub_res = self.commsh_sub.execut_db_sql(
            f"select subname, subenabled, subskiplsn from pg_subscription;"
            f"alter subscription {self.subname} set(enabled=off);"
            f"alter subscription {self.subname} disable;",
            self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_res)
        self.assertEqual(sub_res.count(self.constant.alter_sub_succ_msg), 2,
                         f'执行失败:{text}')
        self.assertNotIn("WARNING", sub_res, f'执行失败:{text}')
        self.assertIn(f"{self.subname} | f", sub_res, f'执行失败:{text}')

        text = '-----step4:发布端插入数据，订阅端查询;expect:无数据-----'
        self.log.info(text)
        pub_res = self.commsh_pub.execut_db_sql(
            f"insert into {self.tb_name} values(1,'pub1');",
            self.user_param_pub)
        self.log.info(pub_res)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, pub_res,
                      f'执行失败:{text}')
        time.sleep(6)
        sub_res = self.commsh_sub.execut_db_sql(
            f"select * from {self.tb_name};", self.user_param_sub, None,
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_res)
        self.assertIn('(0 rows)', sub_res, f'执行失败:{text}')

        text = '-----step5:激活订阅，查询系统表;expect:成功，系统表subenabled字段值为t-----'
        self.log.info(text)
        sub_res = self.commsh_sub.execut_db_sql(
            f"alter subscription {self.subname} set(enabled = on);"
            f"select subname, subenabled from pg_subscription;",
            self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_res)
        self.assertIn(self.constant.alter_sub_succ_msg, sub_res,
                      f'执行失败:{text}')
        self.assertIn(f"{self.subname} | t", sub_res, f'执行失败:{text}')

        text = '-----step6:发布端更新数据，订阅端查询是否同步;expect:同步,3条数据-----'
        self.log.info(text)
        pub_res = self.commsh_pub.execut_db_sql(
            f"insert into {self.tb_name} values(2,'pub2'),(3,'pub3');"
            f"update {self.tb_name} set c_varchar='sub1' where c_int = 1",
            self.user_param_pub)
        self.log.info(pub_res)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, pub_res,
                      f'执行失败:{text}')
        self.assertIn(self.constant.UPDATE_SUCCESS_MSG, pub_res,
                      f'执行失败:{text}')
        time.sleep(6)
        sub_res = self.commsh_sub.execut_db_sql(
            f"select * from {self.tb_name} order by 1;",
            self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_res)
        self.assertIn('1 | sub1\n     2 | pub2\n     3 | pub3\n(3 rows)',
                      sub_res, f'执行失败:{text}')

    def tearDown(self):
        text = '-----step7:清理环境;expect:成功-----'
        self.log.info(text)
        sub_res = self.commsh_sub.execut_db_sql(
            f"drop subscription if exists {self.subname};",
            self.user_param_sub, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_res)
        drop_pub_res = self.commsh_pub.execut_db_sql(
            f"drop publication if exists {self.pubname};", self.user_param_pub)
        self.log.info(drop_pub_res)
        drop_tb = f"drop table if exists {self.tb_name} cascade;"
        drop_tb_sub = self.commsh_sub.execut_db_sql(
            drop_tb, self.user_param_sub, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(drop_tb_sub)
        drop_tb_pub = self.commsh_pub.execut_db_sql(
            drop_tb, self.user_param_pub)
        self.log.info(drop_tb_pub)
        wal_level_res = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG,
            f'wal_level={self.wal_level}')
        self.log.info(wal_level_res)
        restart_res = self.commsh_pub.restart_db_cluster()
        pub_guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '', 'all', False, False,
            '', f'host    replication  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 ')
        self.log.info(pub_guc_res)
        sub_guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_guc_res)
        self.assertIn(self.constant.drop_pub_succ_msg, drop_pub_res,
                      f'删除发布失败:{text}')
        self.assertIn(self.constant.drop_sub_succ_msg, sub_res,
                      f'删除订阅失败:{text}')
        self.assertIn(self.constant.TABLE_DROP_SUCCESS, drop_tb_pub,
                      f'发布端删表失败:{text}')
        self.assertIn(self.constant.TABLE_DROP_SUCCESS, drop_tb_sub,
                      f'订阅端删表失败:{text}')
        self.assertTrue(wal_level_res, f'执行失败:{text}')
        self.assertTrue(restart_res, f'重启发布端集群失败:{text}')
        self.assertTrue(pub_guc_res, f'执行失败:{text}')
        self.assertTrue(sub_guc_res, f'执行失败:{text}')
        self.log.info(f"-----{os.path.basename(__file__)} end-----")
